package onion.mqtt.server.manager;

import onion.mqtt.server.store.SubscribeStore;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Mr, Lu
 * @developmentTeam 浙江允泽信息科技有限公司
 * @createTime 2023/12/12
 */
public class SubscribeManager {
    static final Logger log = LoggerFactory.getLogger(SubscribeManager.class);
    private volatile static SubscribeManager INSTANCE;

    private static final Map<String, List<SubscribeStore>> subscribeMap = new ConcurrentHashMap<>();

    private SubscribeManager() {}

    /**
     * 单例模式，获取对象
     *
     * @return
     */
    public static SubscribeManager getInstance() {
        if (INSTANCE == null) {
            synchronized (SubscribeManager.class) {
                if (INSTANCE == null) {
                    INSTANCE = new SubscribeManager();
                }
            }
        }
        return INSTANCE;
    }

    /**
     * 添加订阅
     *
     * @param subscribeStore
     */
    public synchronized void addSubscribe(SubscribeStore subscribeStore) {
        String topic = subscribeStore.getTopic();
        if (subscribeMap.containsKey(topic)) {
            subscribeMap.get(topic).add(subscribeStore);
        } else {
            ArrayList<SubscribeStore> subscribeStores = new ArrayList<>();
            subscribeStores.add(subscribeStore);
            subscribeMap.put(topic, subscribeStores);
        }
        log.debug("client subscribe, clientId: {}, topic: {}, total: {}", topic, subscribeStore.getClientId(), subscribeMap.size());
    }

    /**
     * 移除订阅
     *
     * @param topic
     */
    public synchronized void removeSubscribe(String topic) {
        List<SubscribeStore> subscribeStores = subscribeMap.get(topic);
        subscribeStores.forEach(s -> {
            log.debug("client unSubscribe, clientId: {}, topic: {}, total: {}", topic, s.getClientId(), subscribeMap.size());
        });
        subscribeMap.remove(topic);

    }

    /**
     * 根据客户端Id移除某个topic
     *
     * @param clientId
     * @param topic
     */
    public synchronized void removeSubscribeByClient(String clientId, String topic) {
        if (!subscribeMap.containsKey(topic)) {
            return;
        }
        subscribeMap.get(topic).removeIf(s -> clientId.equals(s.getClientId()));
    }

    /**
     * 根据客户端Id移除topic
     *
     * @param clientId
     */
    public synchronized void clearSubscribeByClient(String clientId) {
        subscribeMap.values().forEach(s -> s.removeIf(sb -> clientId.equals(sb.getClientId())));
    }

    /**
     * 查询topic列表
     *
     * @param topic
     * @return
     */
    public List<SubscribeStore> searchSubscribe(String topic) {
        List<SubscribeStore> subscribeStoreList = search(topic);
        if (ObjectUtils.isNotEmpty(subscribeMap.get(topic))) {
            subscribeStoreList.addAll(subscribeMap.get(topic));
        }
        return subscribeStoreList;
    }

    /**
     * 查询topic通配符列表
     *
     * @param topic
     * @return
     */
    private List<SubscribeStore> search(String topic) {
        ArrayList<SubscribeStore> subscribeStores = new ArrayList<>();
        subscribeMap.forEach((topicFilter, list) -> {
            if (topic.split("/").length >= topicFilter.split("/").length) {
                String[] splitTopics = topic.split("/");
                String[] splitTopicsFilters = topicFilter.split("/");
                StringBuilder sb = new StringBuilder();
                for (int i = 0; i < splitTopicsFilters.length; i++ ) {
                    String value = splitTopicsFilters[i];
                    if ("+".equals(value)) {
                        sb.append("+/");
                    } else if ("#".equals(value)) {
                        sb.append("#/");
                        break;
                    } else {
                        sb.append(splitTopics[i]).append("/");
                    }
                }
                String endStr = sb.substring(sb.length() - 1);
                if ("/".equals(endStr)) {
                    endStr = sb.substring(0, sb.length() - 1);
                }
                if (topicFilter.equals(endStr)) {
                    subscribeStores.addAll(list);
                }
            }
        });
        return subscribeStores;
    }
}
